{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Yes we need both these imports\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.functions import col, to_date, lit, isnull\n",
    "from pyspark.sql.types import *\n",
    "from pyspark.sql.types import StructField, StructType\n",
    "from pyspark.sql.catalog import UserDefinedFunction\n",
    "from pyspark.ml.feature import *\n",
    "from pyspark.ml.pipeline import Pipeline\n",
    "import os"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fs_prefix = \"s3a://kf-book-examples/mailing-lists\" # Create with mc as in ch1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "os.environ[\"PYSPARK_PYTHON\"] = \"python3.6\"\n",
    "# See https://medium.com/@szinck/setting-up-pyspark-jupyter-and-minio-on-kubeflow-kubernetes-aab98874794f\n",
    "session = (SparkSession.builder\n",
    "           .appName(\"processMailingListData\")\n",
    "           .config(\"spark.executor.instances\", \"8\")\n",
    "           .config(\"spark.driver.memoryOverhead\", \"0.25\")\n",
    "           .config(\"spark.executor.memory\", \"10g\")\n",
    "           .config(\"spark.dynamicAllocation.enabled\", \"false\")\n",
    "           .config(\"spark.ui.enabled\", \"true\")\n",
    "           .config(\"spark.kubernetes.container.image\",\n",
    "                   \"gcr.io/boos-demo-projects-are-rad/kubeflow/spark-worker/spark-py-36:v3.0.0-preview2-23\")\n",
    "           .config(\"spark.driver.bindAddress\", \"0.0.0.0\")\n",
    "           .config(\"spark.kubernetes.namespace\", \"kubeflow-programmerboo\")\n",
    "           .config(\"spark.master\", \"k8s://https://kubernetes.default\")\n",
    "           .config(\"spark.driver.host\", \"spark-driver.kubeflow-programmerboo.svc.cluster.local\")\n",
    "           .config(\"spark.kubernetes.executor.annotation.sidecar.istio.io/inject\", \"false\")\n",
    "           .config(\"spark.driver.port\", \"39235\")\n",
    "           .config(\"spark.blockManager.port\", \"39236\")\n",
    "            # If using minio - see https://github.com/minio/cookbook/blob/master/docs/apache-spark-with-minio.md\n",
    "           .config(\"spark.hadoop.fs.s3a.endpoint\", \"minio-service.kubeflow.svc.cluster.local:9000\")\n",
    "           .config(\"fs.s3a.connection.ssl.enabled\", \"false\")\n",
    "           .config(\"fs.s3a.path.style.access\", \"true\")\n",
    "           # You can also add an account using the minio command as described in chapter 1\n",
    "           .config(\"spark.hadoop.fs.s3a.access.key\", \"minio\")\n",
    "           .config(\"spark.hadoop.fs.s3a.secret.key\", \"minio123\")\n",
    "          ).getOrCreate()\n",
    "sc = session.sparkContext"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Load data from the previous stage\n",
    "#tag::load_data[]\n",
    "initial_posts = session.read.format(\"parquet\").load(fs_prefix + \"/initial_posts\")\n",
    "ids_in_reply = session.read.format(\"parquet\").load(fs_prefix + \"/ids_in_reply\")\n",
    "#end::load_data[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load data from the previous stage while checking the schema\n",
    "#tag::load_with_schema[]\n",
    "ids_schema = StructType([\n",
    "    StructField(\"In-Reply-To\", StringType(), nullable=True),\n",
    "    StructField(\"message-id\", StringType(),nullable=True)])\n",
    "ids_in_reply = session.read.format(\"parquet\").schema(ids_schema).load(fs_prefix + \"/ids_in_reply\")\n",
    "#end::load_with_schema[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Cache the data\n",
    "initial_posts = initial_posts.alias(\"initial_posts\").cache()\n",
    "ids_in_reply = ids_in_reply.alias(\"ids_in_reply\").cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# We can write random SQL -- although we need to wait for preview 3 cause it was taken out in preview1\n",
    "#tag::direct_sql[]\n",
    "#ids_in_reply.registerTempTable(\"cheese\")\n",
    "#no_text = session.sql(\"select * from cheese where body = '' AND subject = ''\")\n",
    "#end::direct_sql[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Drop bad data\n",
    "#tag::drop_bad_fields[]\n",
    "initial_posts_count = initial_posts.count()\n",
    "initial_posts_cleaned = initial_posts.na.drop(how='any', subset=['body', 'from'])\n",
    "initial_posts_cleaned_count = initial_posts_cleaned.count()\n",
    "#end::drop_bad_fields[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "initial_posts.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Start with computing the labels\n",
    "# Find the initial posts where no one replied\n",
    "posts_with_replies = (initial_posts.join(\n",
    "        ids_in_reply,\n",
    "        col(\"ids_in_reply.In-Reply-To\") == col(\"initial_posts.Message-Id\"),\n",
    "        \"left_outer\")\n",
    "       .filter(col(\"ids_in_reply.In-Reply-To\").isNotNull())).cache()\n",
    "posts_with_replies.count()\n",
    "post_ids_with_replies = (posts_with_replies\n",
    "                            .select(col(\"initial_posts.Message-Id\").alias(\"id\"))\n",
    "                            .withColumn(\"has_reply\", lit(1.0))).alias(\"post_with_replies\")\n",
    "\n",
    "joined_posts = initial_posts.join(\n",
    "    post_ids_with_replies,\n",
    "    col(\"initial_posts.Message-Id\") == col(\"post_with_replies.id\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "joined_posts.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "posts_with_labels = joined_posts.na.fill({\"has_reply\": 0.0}).cache()\n",
    "posts_with_labels.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def extract_links(body):\n",
    "    import re\n",
    "    link_regex_str = r'(http(|s)://(.*?))([\\s\\n]|$)'\n",
    "    itr = re.finditer(link_regex_str, body, re.MULTILINE)\n",
    "    return list(map(lambda elem: elem.group(1), itr))\n",
    "\n",
    "def extract_domains(links):\n",
    "    from urllib.parse import urlparse\n",
    "    def extract_domain(link):\n",
    "        try:\n",
    "            nloc = urlparse(link).netloc\n",
    "            # We want to drop www and any extra spaces wtf nloc on the spaces.\n",
    "            regex_str = r'^(www\\.|)(.*?)\\s*$'\n",
    "            match = re.search(regex_str, nloc)\n",
    "            return match.group(2)\n",
    "        except:\n",
    "            return None\n",
    "    return list(map(extract_domain, links))\n",
    "\n",
    "def contains_python_stack_trace(body):\n",
    "    return \"Traceback (most recent call last)\" in body\n",
    "\n",
    "\n",
    "\n",
    "def contains_probably_java_stack_trace(body):\n",
    "    # Look for something based on regex\n",
    "    # Tried https://stackoverflow.com/questions/20609134/regular-expression-optional-multiline-java-stacktrace - more msg looking\n",
    "    # Tried https://stackoverflow.com/questions/3814327/regular-expression-to-parse-a-log-file-and-find-stacktraces\n",
    "    # Yes the compile is per call, but it's cached so w/e\n",
    "    import re\n",
    "    stack_regex_str = r'^\\s*(.+Exception.*):\\n(.*\\n){0,3}?(\\s+at\\s+.*\\(.*\\))+'\n",
    "    match = re.search(stack_regex_str, body, re.MULTILINE)\n",
    "    return match is not None\n",
    "\n",
    "\n",
    "def contains_exception_in_task(body):\n",
    "    # Look for a line along the lines of ERROR Executor: Exception in task \n",
    "    return \"ERROR Executor: Exception in task\" in body\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "extract_links_udf = UserDefinedFunction(\n",
    "    extract_links, ArrayType(StringType()), \"extract_links\")\n",
    "\n",
    "session.catalog._jsparkSession.udf().registerPython(\n",
    "    \"extract_links\",\n",
    "    extract_links_udf._judf)\n",
    "\n",
    "\n",
    "extract_domains_udf = UserDefinedFunction(\n",
    "    extract_domains, ArrayType(StringType()), \"extract_domains\")\n",
    "\n",
    "session.catalog._jsparkSession.udf().registerPython(\n",
    "    \"extract_domains\",\n",
    "    extract_domains_udf._judf)\n",
    "\n",
    "\n",
    "contains_python_stack_trace_udf = UserDefinedFunction(\n",
    "    contains_python_stack_trace, BooleanType(), \"contains_python_stack_trace\")\n",
    "\n",
    "session.catalog._jsparkSession.udf().registerPython(\n",
    "    \"contains_python_stack_trace\",\n",
    "    contains_python_stack_trace_udf._judf)\n",
    "\n",
    "\n",
    "contains_probably_java_stack_trace_udf = UserDefinedFunction(\n",
    "    contains_probably_java_stack_trace, BooleanType(), \"contains_probably_java_stack_trace\")\n",
    "\n",
    "session.catalog._jsparkSession.udf().registerPython(\n",
    "    \"contains_probably_java_stack_trace\",\n",
    "    contains_probably_java_stack_trace_udf._judf)\n",
    "\n",
    "\n",
    "contains_exception_in_task_udf = UserDefinedFunction(\n",
    "    contains_exception_in_task, BooleanType(), \"contains_exception_in_task\")\n",
    "\n",
    "session.catalog._jsparkSession.udf().registerPython(\n",
    "    \"contains_exception_in_task\",\n",
    "    contains_exception_in_task_udf._judf)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We could make this a transformer stage, but I'm lazy so we'll just use a UDF directly."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "annotated_spark_mailing_list_data = posts_with_labels.select(\n",
    "    \"*\",\n",
    "    extract_links_udf(posts_with_labels[\"body\"]).alias(\"links_in_email\"),\n",
    "    contains_python_stack_trace_udf(posts_with_labels.body).alias(\"contains_python_stack_trace\").cast(\"double\"),\n",
    "    contains_probably_java_stack_trace_udf(posts_with_labels.body).alias(\"contains_java_stack_trace\").cast(\"double\"),\n",
    "    contains_exception_in_task_udf(posts_with_labels.body).alias(\"contains_exception_in_task\").cast(\"double\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "annotated_spark_mailing_list_data.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "annotated_spark_mailing_list_data.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "further_annotated = annotated_spark_mailing_list_data.withColumn(\n",
    "    \"domain_links\",\n",
    "    extract_domains_udf(annotated_spark_mailing_list_data.links_in_email))\n",
    "# Long story, allow mixed UDF types\n",
    "further_annotated.cache()\n",
    "further_annotated.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::make_features[]\n",
    "tokenizer = Tokenizer(inputCol=\"body\", outputCol=\"body_tokens\")\n",
    "body_hashing = HashingTF(\n",
    "    inputCol=\"body_tokens\", outputCol=\"raw_body_features\",\n",
    "    numFeatures=10000)\n",
    "body_idf = IDF(\n",
    "    inputCol=\"raw_body_features\", outputCol=\"body_features\")\n",
    "body_word2Vec = Word2Vec(\n",
    "    vectorSize=5, minCount=0, numPartitions=10,\n",
    "    inputCol=\"body_tokens\", outputCol=\"body_vecs\")\n",
    "assembler = VectorAssembler(\n",
    "    inputCols=[\n",
    "        \"body_features\", \"body_vecs\", \"contains_python_stack_trace\",\n",
    "        \"contains_java_stack_trace\", \"contains_exception_in_task\"],\n",
    "    outputCol=\"features\")\n",
    "#end::make_features[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "featureprep_pipeline = Pipeline(\n",
    "    stages=[tokenizer, body_hashing, body_idf, body_word2Vec, assembler])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "featureprep_pipeline_transformer = featureprep_pipeline.fit(further_annotated)\n",
    "preped_data = featureprep_pipeline_transformer.transform(further_annotated)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "featureprep_pipeline_transformer.write().save(fs_prefix+\"/feature_prep-2\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "preped_data.write.format(\"parquet\").mode(\"overwrite\").save(fs_prefix+\"/prepared_data\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
